3. Data Parallelism

Focusing on data instead of processes helps a great deal to create robust concurrent programs. You as a programmer define your data together with functions that should be applied to it and then let the underlying machinery to process the data. Typically a set of concurrent tasks will be created and then they will be submitted to a thread pool for processing.

In GPars the GParsPool and GParsExecutorsPool classes give you access to low-level data parallelism techniques. While the GParsPool class relies on the jsr-166y Fork/Join framework and so offers greater functionality and better performance, the GParsExecutorsPool uses good old Java executors and so is easier to setup in a managed or restricted environment.

There are three fundamental domains covered by the GPars low-level data parallelism:

  1. Processing collections concurrently
  2. Running functions (closures) asynchronously
  3. Performing Fork/Join (Divide/Conquer) algorithms

3.1 Parallel Collections

Dealing with data frequently involves manipulating collections. Lists, arrays, sets, maps, iterators, strings and lot of other data types can be viewed as collections of items. The common pattern to process such collections is to take elements sequentially, one-by-one, and make an action for each of the items in row.

Take, for example, the min() function, which is supposed to return the smallest element of a collection. When you call the min() method on a collection of numbers, the caller thread will create an accumulator or so-far-the-smallest-value initialized to the minimum value of the given type, let say to zero. And then the thread will iterate through the elements of the collection and compare them with the value in the accumulator . Once all elements have been processed, the minimum value is stored in the accumulator .

This algorithm, however simple, is totally wrong on multi-core hardware. Running the min() function on a dual-core chip can leverage at most 50% of the computing power of the chip. On a quad-core it would be only 25%. Correct, this algorithm effectively wastes 75% of the computing power of the chip.

Tree-like structures proved to be more appropriate for parallel processing. The min() function in our example doesn't need to iterate through all the elements in row and compare their values with the accumulator . What it can do instead is relying on the multi-core nature of your hardware. A parallel_min() function could, for example, compare pairs (or tuples of certain size) of neighboring values in the collection and promote the smallest value from the tuple into a next round of comparison. Searching for minimum in different tuples can safely happen in parallel and so tuples in the same round can be processed by different cores at the same time without races or contention among threads.

Meet Parallel Arrays

The jsr-166y library brings a very convenient abstraction called Parallel Arrays . GPars leverages the Parallel Arrays implementation in several ways. The GParsPool and GParsExecutorsPool classes provide parallel variants of the common Groovy iteration methods like each() , collect() , findAll() and such.

def selfPortraits = images.findAllParallel{it.contains me}.collectParallel {it.resize()}
It also allows for a more functional style map/reduce collection processing.
def smallestSelfPortrait = images.parallel.filter{it.contains me}.map{it.resize()}.min{it.sizeInMB}

3.1.1 GParsPool

Use of GParsPool - the JSR-166y based concurrent collection processor

Usage of GParsPool

The GParsPool class enables a ParallelArray-based (from JSR-166y) concurrency DSL for collections and objects.

Examples of use:

//summarize numbers concurrently
 GParsPool.withPool {
     final AtomicInteger result = new AtomicInteger(0)
     [1, 2, 3, 4, 5].eachParallel {result.addAndGet(it)}
     assertEquals 15, result
 }

//multiply numbers asynchronously GParsPool.withPool { final List result = [1, 2, 3, 4, 5].collectParallel {it * 2} assert ([2, 4, 6, 8, 10].equals(result)) }

The passed-in closure takes an instance of a ForkJoinPool as a parameter, which can be then used freely inside the closure.
//check whether all elements within a collection meet certain criteria
 GParsPool.withPool(5) {ForkJoinPool pool ->
     assert [1, 2, 3, 4, 5].everyParallel {it > 0}
     assert ![1, 2, 3, 4, 5].everyParallel {it > 1}
 }
The GParsPool.withPool() method takes optional parameters for number of threads in the created pool and an unhandled exception handler.
withPool(10) {...}
withPool(20, exceptionHandler) {...}

The GParsPool.withExistingPool() takes an already existing ForkJoinPool instance to reuse. The DSL is valid only within the associated block of code and only for the thread that has called the withPool() or withExistingPool() methods. The withPool() method returns only after all the worker threads have finished their tasks and the pool has been destroyed, returning back the return value of the associated block of code. The withExistingPool() method doesn't wait for the pool threads to finish.

Alternatively, the GParsPool class can be statically imported import static groovyx.gpars.GParsPool.`*` , which will allow omitting the GParsPool class name.

withPool {
     assert [1, 2, 3, 4, 5].everyParallel {it > 0}
     assert ![1, 2, 3, 4, 5].everyParallel {it > 1}
 }

The following methods are currently supported on all objects in Groovy:

Meta-class enhancer

As an alternative you can use the ParallelEnhancer class to enhance meta-classes of any classes or individual instances with the parallel methods.

import groovyx.gpars.ParallelEnhancer

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9] ParallelEnhancer.enhanceInstance(list) println list.collectParallel {it * 2 }

def animals = ['dog', 'ant', 'cat', 'whale'] ParallelEnhancer.enhanceInstance animals println (animals.anyParallel {it ==~ /ant/} ? 'Found an ant' : 'No ants found') println (animals.everyParallel {it.contains('a')} ? 'All animals contain a' : 'Some animals can live without an a')

When using the ParallelEnhancer class, you're not restricted to a withPool() block with the use of the GParsPool DSLs. The enhanced classed or instances remain enhanced till they get garbage collected.

Exception handling

If an exception is thrown while processing any of the passed-in closures, the first exception gets re-thrown from the xxxParallel methods and the algorithm stops as soon as possible.

The exception handling mechanism of GParsPool builds on the one built into the Fork/Join framework. Since Fork/Join algorithms are by nature hierarchical, once any part of the algorithm fails, there's usually little benefit from continuing the computation, since some branches of the algorithm will never return a result.

Bear in mind that the GParsPool implementation doesn't give any guarantees about its behavior after a first unhandled exception occurs, beyond stopping the algorithm and re-throwing the first detected exception to the caller. This behavior, after all, is consistent with what the traditional sequential iteration methods do.

Transparently parallel collections

On top of adding new xxxParallel() methods, GPars can also let you change the semantics of the original iteration methods. For example, you may be passing a collection into a library method, which will process your collection in a sequential way, let say using the collect() method. By changing the semantics of the collect() method on your collection you can effectively parallelize the library sequential code.

GParsPool.withPool {

//The selectImportantNames() will process the name collections concurrently assert ['ALICE', 'JASON'] == selectImportantNames(['Joe', 'Alice', 'Dave', 'Jason'].makeTransparent()) }

/** * A function implemented using standard sequential collect() and findAll() methods. */ def selectImportantNames(names) { names.collect {it.toUpperCase()}.findAll{it.size() > 4} }

Transparent parallelizm is also available in combination with ParallelEnhancer .

/**
 * A function implemented using standard sequential collect() and findAll() methods.
 */
def selectImportantNames(names) {
    names.collect {it.toUpperCase()}.findAll{it.size() > 4}
}

def names = ['Joe', 'Alice', 'Dave', 'Jason'] ParallelEnhancer.enhanceInstance(names) //The selectImportantNames() will process the name collections concurrently assert ['ALICE', 'JASON'] == selectImportantNames(names.makeTransparent())

Dependency resolution

For the GParsPool class to work, the jsr166y-070108.jar must be on the classpath.

<dependency>
    <groupId>org.coconut.forkjoin</groupId>
    <artifactId>jsr166y</artifactId>
    <version>070108</version>
</dependency>

Avoid side-effects in functions

We have to warn you. Since the closures that are provided to the parallel methods like eachParallel() or collectParallel() may be run in parallel, you have to make sure that each of the closures is written in a thread-safe manner. The closures must hold no internal state, share data nor have side-effects beyond the boundaries the single element that they've been invoked on. Violations of these rules will open the door for race conditions and deadlocks, the most severe enemies of a modern multi-core programmer.

Don't do this:

def thumbnails = []
images.eachParallel {thumbnails << it.thumbnail}  //Concurrently accessing a not-thread-safe collection of thumbnails, don't do this!
At least, you've been warned.

3.1.2 GParsExecutorsPool

Use of GParsExecutorsPool - the Java Executors' based concurrent collection processor

Usage of GParsExecutorsPool

The GParsPool class enables a Java Executors-based concurrency DSL for collections and objects.

The GParsExecutorsPool class can be used as a pure-JDK-based collection parallel processor. Unlike the GParsPool class, GParsExecutorsPool doesn't require jsr-166y jar file, but leverages the standard JDK executor services to parallelize closures processing a collections or an object iteratively. It needs to be states, however, that GParsPool performs typically much better than GParsExecutorsPool does.

Examples of use:

//multiply numbers asynchronously
 GParsExecutorsPool.withPool {
     Collection<Future> result = [1, 2, 3, 4, 5].collectParallel{it * 10}
     assertEquals(new HashSet([10, 20, 30, 40, 50]), new HashSet((Collection)result*.get()))
 }

//multiply numbers asynchronously using an asynchronous closure GParsExecutorsPool.withPool { def closure={it * 10} def asyncClosure=closure.async() Collection<Future> result = [1, 2, 3, 4, 5].collect(asyncClosure) assertEquals(new HashSet([10, 20, 30, 40, 50]), new HashSet((Collection)result*.get())) }

The passed-in closure takes an instance of a ExecutorService as a parameter, which can be then used freely inside the closure.
//find an element meeting specified criteria
 GParsExecutorsPool.withPool(5) {ExecutorService service ->
     service.submit({performLongCalculation()} as Runnable)
 }
The GParsExecutorsPool.withPool() method takes optional parameters for number of threads in the created pool and a thread factory.
withPool(10) {...}
withPool(20, threadFactory) {...}

The GParsExecutorsPool.withExistingPool() takes an already existing executor service instance to reuse. The DSL is valid only within the associated block of code and only for the thread that has called the withPool() or withExistingPool() method. The withPool() method returns only after all the worker threads have finished their tasks and the executor service has been destroyed, returning back the return value of the associated block of code. The withExistingPool() method doesn't wait for the executor service threads to finish.

Alternatively, the GParsExecutorsPool class can be statically imported import static groovyx.gpars.GParsExecutorsPool.`*`_, which will allow omitting the _GParsExecutorsPool class name.

withPool {
     def result = [1, 2, 3, 4, 5].findParallel{Number number -> number > 2}
     assert result in [3, 4, 5]
 }
The following methods on all objects, which support iterations in Groovy, are currently supported:

Meta-class enhancer

As an alternative you can use the GParsExecutorsPoolEnhancer class to enhance meta-classes for any classes or individual instances with asynchronous methods.

import groovyx.gpars.GParsExecutorsPoolEnhancer

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9] GParsExecutorsPoolEnhancer.enhanceInstance(list) println list.collectParallel {it * 2 }

def animals = ['dog', 'ant', 'cat', 'whale'] GParsExecutorsPoolEnhancer.enhanceInstance animals println (animals.anyParallel {it ==~ /ant/} ? 'Found an ant' : 'No ants found') println (animals.allParallel {it.contains('a')} ? 'All animals contain a' : 'Some animals can live without an a')

When using the GParsExecutorsPoolEnhancer class, you're not restricted to a withPool() block with the use of the GParsExecutorsPool DSLs. The enhanced classed or instances remain enhanced till they get garbage collected.

Exception handling

If exceptions are thrown while processing any of the passed-in closures, an instance of AsyncException wrapping all the original exceptions gets re-thrown from the xxxParallel methods.

Avoid side-effects in functions

Once again we need to warn you about using closures with side-effects effecting objects beyond the scope of the single currently processed element or closures which keep state. Don't do that! It is dangerous to pass them to any of the xxxParallel() methods.

3.1.3 Memoize

The memoize function enables caching of function's return values. Repeated calls to the memoized function with the same argument values will, instead of invoking the calculation encoded in the original function, retrieve the result value from an internal transparent cache. Provided the calculation is considerably slower than retrieving a cached value from the cache, this allows users to trade-off memory for performance. Checkout out the example, where we attempt to scan multiple websites for particular content:

The memoize functionality of GPars has been contributed to Groovy in version 1.8 and if you run on Groovy 1.8 or later, it is recommended to use the Groovy functionality. Memoize in GPars is almost identical, except that it searches the memoize caches concurrently using the surrounding thread pool and so may give performance benefits in some scenarios.

The GPars memoize functionality has been renamed to avoid future conflicts with the memoize functionality in Groovy. GPars now calls the methods with a preceding letter g , such as gmemoize().

Examples of use

GParsPool.withPool {
    def urls = ['http://www.dzone.com', 'http://www.theserverside.com', 'http://www.infoq.com']
    Closure download = {url ->
        println "Downloading $url"
        url.toURL().text.toUpperCase()
    }
    Closure cachingDownload = download.gmemoize()

println 'Groovy sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GROOVY')} println 'Grails sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GRAILS')} println 'Griffon sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GRIFFON')} println 'Gradle sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GRADLE')} println 'Concurrency sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('CONCURRENCY')} println 'GPars sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GPARS')} }

Notice closures are enhanced inside the GParsPool.withPool() blocks with a memoize() function, which returns a new closure wrapping the original closure with a cache. In the example we're calling the cachingDownload function in several places in the code, however, each unique url gets downloaded only once - the first time it is needed. The values are then cached and available for subsequent calls. And also to all threads, no matter which thread originally came first with a download request for the particular url and had to handle the actual calculation/download.

So, to wrap up, memoize shields a function by a cache of past return values. However, memoize can do even more. In some algorithms adding a little memory may have dramatic impact on the computational complexity of the calculation. Let's look at a classical example of Fibonacci numbers.

Fibonacci example

A purely functional, recursive implementation, following closely the definition of Fibonacci numbers is exponentially complex:

Closure fib = {n -> n > 1 ? call(n - 1) + call(n - 2) : n}

Try calling the fib function with numbers around 30 and you'll see how slow it is.

Now with a little twist and added memoize cache the algorithm magically turns into a linearly complex one:

Closure fib
fib = {n -> n > 1 ? fib(n - 1) + fib(n - 2) : n}.gmemoize()

The extra memory we added cut off all but one recursive branches of the calculation. And all subsequent calls to the same fib function will also benefit from the cached values.

Also, see below, how the memoizeAtMost variant can reduce memory consumption in our example, yet preserve the linear complexity of the algorithm.

Available variants

memoize

The basic variant, which keeps values in the internal cache for the whole lifetime of the memoized function. Provides the best performance characteristics of all the variants.

memoizeAtMost

Allows the user to set a hard limit on number of items cached. Once the limit has been reached, all subsequently added values will eliminate the oldest value from the cache using the LRU (Last Recently Used) strategy.

So for our Fibonacci number example, we could safely reduce the cache size to two items:

Closure fib
fib = {n -> n > 1 ? fib(n - 1) + fib(n - 2) : n}.memoizeAtMost(2)

Setting an upper limit on the cache size may have two purposes:

  1. Keep the memory footprint of the cache within defined boundaries
  2. Preserve desired performance characteristics of the function. Too large caches may take longer to retrieve the cached value than it would have taken to calculate the result directly.

memoizeAtLeast

Allows unlimited growth of the internal cache until the JVM's garbage collector decides to step in and evict SoftReferences, used by our implementation, from the memory. The single parameter value to the memoizeAtLeast() method specifies the minimum number of cached items that should be protected from gc eviction. The cache will never shrink below the specified number of entries. The cache ensures it only protects the most recently used items from eviction using the LRU (Last Recently Used) strategy.

memoizeBetween

Combines memoizeAtLeast and memoizeAtMost and so allowing the cache to grow and shrink in the range between the two parameter values depending on available memory and the gc activity, yet the cache size will never exceed the upper size limit to preserve desired performance characteristics of the cache.

3.2 Map-Reduce

The Parallel Collection Map/Reduce DSL gives GPars a more functional flavor. In general, the Map/Reduce DSL may be used for the same purpose as the xxxParallel() family methods and has very similar semantics. On the other hand, Map/Reduce can perform considerably faster, if you need to chain multiple methods to process a single collection in multiple steps:
println 'Number of occurrences of the word GROOVY today: ' + urls.parallel
            .map {it.toURL().text.toUpperCase()}
            .filter {it.contains('GROOVY')}
            .map{it.split()}
            .map{it.findAll{word -> word.contains 'GROOVY'}.size()}
            .sum()

The xxxParallel() methods have to follow the contract of their non-parallel peers. So a collectParallel() method must return a legal collection of items, which you can again treat as a Groovy collection. Internally the parallel collect method builds an efficient parallel structure, called parallel array, performs the required operation concurrently and before returning destroys the Parallel Array building the collection of results to return to you. A potential call to let say findAllParallel() on the resulting collection would repeat the whole process of construction and destruction of a Parallel Array instance under the covers.

With Map/Reduce you turn your collection into a Parallel Array and back only once. The Map/Reduce family of methods do not return Groovy collections, but are free to pass along the internal Parallel Arrays directly. Invoking the parallel property on a collection will build a Parallel Array for the collection and return a thin wrapper around the Parallel Array instance. Then you can chain all required methods like:

Returning back to a plain Groovy collection instance is always just a matter of retrieving the collection property.

def myNumbers = (1..1000).parallel.filter{it % 2 == 0}.map{Math.sqrt it}.collection

Avoid side-effects in functions

Once again we need to warn you. To avoid nasty surprises, please, keep your closures, which you pass to the Map/Reduce functions, stateless and clean from side-effects.

Availability

This feature is only available when using in the Fork/Join-based GParsPool , not in GParsExecutorsPool .

Classical Example

A classical example, inspired by http://github.com/thevery, counting occurencies of words in a string:

import static groovyx.gpars.GParsPool.withPool

def words = "This is just a plain text to count words in" print count(words)

def count(arg) { withPool { return arg.parallel .map{[it, 1]} .groupBy{it[0]}.getParallel() .map {it.value=it.value.size();it} .sort{-it.value}.collection } }

The same example, now implemented the more general combine operation:

def words = "This is just a plain text to count words in"
print count(words)

def count(arg) { withPool { return arg.parallel .map{[it, 1]} .combine(0) {sum, value -> sum + value}.getParallel() .sort{-it.value}.collection } }

Combine

The combine operation expects on its input a list of tuples (two-element lists) considered to be key-value pairs (such as [key1, value1, key2, value2, key1, value3, key3, value4 … ] ) with potentially repeating keys. When invoked, combine merges the values for identical keys using the provided accumulator function and produces a map mapping the original (unique) keys to their accumulated values. E.g. [a, b, c, d, a, e, c, f] will be combined into a : b+e, c : d+f, while the '+' operation on the values needs to be provided by the user as the accumulation closure. The accumulation function argument needs to specify a function to use for combining (accumulating) the values belonging to the same key. An initial accumulator value needs to be provided as well. Since the combine method processes items in parallel, the initial accumulator value will be reused multiple times. Thus the provided value must allow for reuse. It should be either a cloneable or immutable value or a closure returning a fresh initial accumulator each time requested. Good combinations of accumulator functions and reusable initial values include:

accumulator = {List acc, value -> acc << value} initialValue = []
accumulator = {List acc, value -> acc << value} initialValue = {-> []}
accumulator = {int sum, int value -> acc + value} initialValue = 0
accumulator = {int sum, int value -> sum + value} initialValue = {-> 0}
accumulator = {ShoppingCart cart, Item value -> cart.addItem(value)} initialValue = {-> new ShoppingCart()}

The return type is a map. E.g. ['he', 1, 'she', 2, 'he', 2, 'me', 1, 'she, 5, 'he', 1 with the initial value provided a 0 will be combined into 'he' : 4, 'she' : 7, 'he', : 2, 'me' : 1

3.3 Parallel Arrays

As an alternative, the efficient tree-based data structures defines in JSR-166y can be used directly. The parallelArray property on any collection or object will return a jsr166y.forkjoin.ParallelArray instance holding the elements of the original collection, which then can be manipulated through the jsr166y API. Please refer to the jsr166y documentation for the API details.

groovyx.gpars.GParsPool.withPool {
    assert 15 == [1, 2, 3, 4, 5].parallelArray.reduce({a, b -> a + b} as Reducer, 0)                                        //summarize
    assert 55 == [1, 2, 3, 4, 5].parallelArray.withMapping({it ** 2} as Mapper).reduce({a, b -> a + b} as Reducer, 0)       //summarize squares
    assert 20 == [1, 2, 3, 4, 5].parallelArray.withFilter({it % 2 == 0} as Predicate)                                       //summarize squares of even numbers
            .withMapping({it ** 2} as Mapper)
            .reduce({a, b -> a + b} as Reducer, 0)

assert 'aa:bb:cc:dd:ee' == 'abcde'.parallelArray //concatenate duplicated characters with separator .withMapping({it * 2} as Mapper) .reduce({a, b -> "$a:$b"} as Reducer, "")

3.4 Asynchronous Invocation

Running long-lasting tasks in the background belongs to the activities, the need for which arises quite frequently. Your main thread of execution wants to initialize a few calculations, downloads, searches or such, however, the results may not be needed immediately. GPars gives the developers the tools to schedule the asynchronous activities for processing in the background and collect the results once they're needed.

Usage of GParsPool and GParsExecutorsPool asynchronous processing facilities

Both GParsPool and GParsExecutorsPool provide almost identical services in this domain, although they leverage different underlying machinery, based on which of the two classes the user chooses.

Closures enhancements

The following methods are added to closures inside the GPars(Executors)Pool.withPool() blocks:

Examples:

GParsPool.withPool() {
    Closure longLastingCalculation = {calculate()}
    Closure fastCalculation = longLastingCalculation.async()  //create a new closure, which starts the original closure on a thread pool
    Future result=fastCalculation()                           //returns almost immediately
    //do stuff while calculation performs …
    println result.get()
}

GParsPool.withPool() {
    /**
     * The callAsync() method is an asynchronous variant of the default call() method to invoke a closure.
     * It will return a Future for the result value.
     */
    assert 6 == {it * 2}.call(3)
    assert 6 == {it * 2}.callAsync(3).get()
}

Timeouts

The callTimeoutAsync() methods, taking either a long value or a Duration instance, allow the user to have the calculation cancelled after a given time interval.

{->
    while(true) {
        Thread.sleep 1000  //Simulate a bit of interesting calculation
        if (Thread.currentThread().isInterrupted()) break;  //We've been cancelled
    }
}.callTimeoutAsync(2000)

In order to allow cancellation, the asynchronously running code must keep checking the interrupted flag of its own thread and cease the calculation once the flag is set to true.

Executor Service enhancements

The ExecutorService and jsr166y.forkjoin.ForkJoinPool class is enhanced with the << (leftShift) operator to submit tasks to the pool and return a Future for the result.

Example:

GParsExecutorsPool.withPool {ExecutorService executorService ->
    executorService << {println 'Inside parallel task'}
}

Running functions (closures) in parallel

The GParsPool and GParsExecutorsPool classes also provide handy methods executeAsync() and executeAsyncAndWait() to easily run multiple closures asynchronously.

Example:

GParsPool.withPool {
    assertEquals([10, 20], GParsPool.executeAsyncAndWait({calculateA()}, {calculateB()}))         //waits for results
    assertEquals([10, 20], GParsPool.executeAsync({calculateA()}, {calculateB()})*.get())  //returns Futures instead and doesn't wait for results to be calculated
}

3.5 Parallel Speculations

With processor cores having become plentiful, some algorithms might benefit from brutal-force parallel duplication. Instead of deciding up-front about how to solve a problem, what algorithm to use or which location to connect to, you run all potential solutions in parallel.

Parallel speculations

Imagine you need to perform a task like e.g. calculate an expensive function or read data from a file, database or internet. Luckily, you know of several good ways (e.g. functions or urls) to achieve your goal. However, they are not all equal. Although they return back the same (as far as your needs are concerned) result, they may all take different amount of time to complete and some of them may even fail (e.g. network issues). What's worse, no-one is going to tell you which path gives you the solution first nor which paths lead to no solution at all. Shall I run quick sort or merge sort on my list? Which url will work best? Is this service available at its primary location or should I use the backup one?

GPars speculations give you the option to try all the available alternatives in parallel and so get the result from the fastest functional path, silently ignoring the slow or broken ones.

This is what the speculate() methods on GParsPool and GParsExecutorsPool() can do.

def numbers = …
def quickSort = …
def mergeSort = …
def sortedNumbers = speculate(quickSort, mergeSort)

Here we're performing both quick sort and merge sort concurrently, while getting the result of the faster one. Given the parallel resources available these days on mainstream hardware, running the two functions in parallel will not have dramatic impact on speed of calculation of either one, and so we get the result in about the same time as if we ran solely the faster of the two calculations. And we get the result sooner than when running the slower one. Yet we didn't have to know up-front, which of the two sorting algorithms would perform better on our data. Thus we speculated.

Similarly, downloading a document from multiple sources of different speed and reliability would look like this:

import static groovyx.gpars.GParsPool.speculate
import static groovyx.gpars.GParsPool.withPool

def alternative1 = { 'http://www.dzone.com/links/index.html'.toURL().text }

def alternative2 = { 'http://www.dzone.com/'.toURL().text }

def alternative3 = { 'http://www.dzzzzzone.com/'.toURL().text //wrong url }

def alternative4 = { 'http://dzone.com/'.toURL().text }

withPool(4) { println speculate([alternative1, alternative2, alternative3, alternative4]).contains('groovy') }

Make sure the surrounding thread pool has enough threads to process all alternatives in parallel. The size of the pool should match the number of closures supplied.

Alternatives using dataflow variables and streams

In cases, when stopping unsuccessful alternatives is not needed, dataflow variables or streams may be used to obtain the result value from the winning speculation.

Please refer to the Dataflow Concurrency section of the User Guide for details on Dataflow variables and streams.

import groovyx.gpars.dataflow.DataFlowQueue
import static groovyx.gpars.dataflow.DataFlow.task

def alternative1 = { 'http://www.dzone.com/links/index.html'.toURL().text }

def alternative2 = { 'http://www.dzone.com/'.toURL().text }

def alternative3 = { 'http://www.dzzzzzone.com/'.toURL().text //will fail due to wrong url }

def alternative4 = { 'http://dzone.com/'.toURL().text }

//Pick either one of the following, both will work: final def result = new DataFlowQueue() // final def result = new DataFlowVariable()

[alternative1, alternative2, alternative3, alternative4].each {code -> task { try { result << code() } catch (ignore) { } //We deliberately ignore unsuccessful urls } }

println result.val.contains('groovy')

3.6. Fork-Join

Fork/Join or Divide and Conquer is a very powerful abstraction to solve hierarchical problems.

The abstraction

When talking about hierarchical problems, think about quick sort, merge sort, file system or general tree navigation and such.

The mighty JSR-166y library solves Fork / Join orchestration pretty nicely for us, but leaves a couple of rough edges, which can hurt you, if you don't pay attention enough. You still deal with threads, pools or synchronization barriers.

The GPars abstraction convenience layer

GPars can hide the complexities of dealing with threads, pools and recursive tasks from you, yet let you leverage the powerful Fork/Join implementation in jsr166y.

import static groovyx.gpars.GParsPool.runForkJoin
import static groovyx.gpars.GParsPool.withPool

withPool() { println """Number of files: ${ runForkJoin(new File("./src")) {file -> long count = 0 file.eachFile { if (it.isDirectory()) { println "Forking a child task for $it" forkOffChild(it) //fork a child task } else { count++ } } return count + (childrenResults.sum(0)) //use results of children tasks to calculate and store own result } }""" }

The runForkJoin() factory method will use the supplied recursive code together with the provided values and build a hierarchical Fork/Join calculation. The number of values passed to the runForkJoin() method must match the number of expected parameters of the closure as well as the number of arguments passed into the forkOffChild() method.

def quicksort(numbers) {
    withPool {
        runForkJoin(0, numbers) {index, list ->
            def groups = list.groupBy {it <=> list[list.size().intdiv(2)]}
            if ((list.size() < 2) || (groups.size() == 1)) {
                return [index: index, list: list.clone()]
            }
            (-1..1).each {forkOffChild(it, groups[it] ?: [])}
            return [index: index, list: childrenResults.sort {it.index}.sum {it.list}]
        }.list
    }
}

Alternative approach

Alternatively, the underlying mechanism of nested Fork/Join worker tasks can be used directly. Custom-tailored workers can eliminate the performance overhead associated with parameter spreading imposed when using the generic workers. Also, custom workers can be implemented in Java and so further increase the performance of the algorithm.

public final class FileCounter extends AbstractForkJoinWorker<Long> {
    private final File file;

def FileCounter(final File file) { this.file = file }

@Override protected Long computeTask() { long count = 0; file.eachFile { if (it.isDirectory()) { println "Forking a thread for $it" forkOffChild(new FileCounter(it)) //fork a child task } else { count++ } } return count + ((childrenResults)?.sum() ?: 0) //use results of children tasks to calculate and store own result } }

withPool(1) {pool -> //feel free to experiment with the number of fork/join threads in the pool println "Number of files: ${runForkJoin(new FileCounter(new File("..")))}" }

The AbstractForkJoinWorker subclasses may be written both in Java or Groovy, giving you the option to easily optimize for execution speed, if row performance of the worker becomes a bottleneck.

Fork / Join saves your resources

Fork/Join operations can be safely run with small number of threads thanks to internally using the TaskBarrier class to synchronize the threads. While a thread is blocked inside an algorithm waiting for its sub-problems to be calculated, the thread is silently returned to the pool to take on any of the available sub-problems from the task queue and process them. Although the algorithm creates as many tasks as there are sub-directories and tasks wait for the sub-directory tasks to complete, as few as one thread is enough to keep the computation going and eventually calculate a valid result.

Mergesort example

import static groovyx.gpars.GParsPool.runForkJoin
import static groovyx.gpars.GParsPool.withPool

/** * Splits a list of numbers in half */ def split(List<Integer> list) { int listSize = list.size() int middleIndex = listSize / 2 def list1 = list[0..<middleIndex] def list2 = list[middleIndex..listSize - 1] return [list1, list2] }

/** * Merges two sorted lists into one */ List<Integer> merge(List<Integer> a, List<Integer> b) { int i = 0, j = 0 final int newSize = a.size() + b.size() List<Integer> result = new ArrayList<Integer>(newSize)

while ((i < a.size()) && (j < b.size())) { if (a[i] <= b[j]) result << a[i++] else result << b[j++] }

if (i < a.size()) result.addAll(a[i..-1]) else result.addAll(b[j..-1]) return result }

final def numbers = [1, 5, 2, 4, 3, 8, 6, 7, 3, 4, 5, 2, 2, 9, 8, 7, 6, 7, 8, 1, 4, 1, 7, 5, 8, 2, 3, 9, 5, 7, 4, 3]

withPool(3) { //feel free to experiment with the number of fork/join threads in the pool println """Sorted numbers: ${ runForkJoin(numbers) {nums -> println "Thread ${Thread.currentThread().name[-1]}: Sorting $nums" switch (nums.size()) { case 0..1: return nums //store own result case 2: if (nums[0] <= nums[1]) return nums //store own result else return nums[-1..0] //store own result default: def splitList = split(nums) [splitList[0], splitList[1]].each {forkOffChild it} //fork a child task return merge(* childrenResults) //use results of children tasks to calculate and store own result } } }""" }

Mergesort example using a custom-tailored worker class

public final class SortWorker extends AbstractForkJoinWorker<List<Integer>> {
    private final List numbers

def SortWorker(final List<Integer> numbers) { this.numbers = numbers.asImmutable() }

/** * Splits a list of numbers in half */ def split(List<Integer> list) { int listSize = list.size() int middleIndex = listSize / 2 def list1 = list[0..<middleIndex] def list2 = list[middleIndex..listSize - 1] return [list1, list2] }

/** * Merges two sorted lists into one */ List<Integer> merge(List<Integer> a, List<Integer> b) { int i = 0, j = 0 final int newSize = a.size() + b.size() List<Integer> result = new ArrayList<Integer>(newSize)

while ((i < a.size()) && (j < b.size())) { if (a[i] <= b[j]) result << a[i++] else result << b[j++] }

if (i < a.size()) result.addAll(a[i..-1]) else result.addAll(b[j..-1]) return result }

/** * Sorts a small list or delegates to two children, if the list contains more than two elements. */ @Override protected List<Integer> computeTask() { println "Thread ${Thread.currentThread().name[-1]}: Sorting $numbers" switch (numbers.size()) { case 0..1: return numbers //store own result case 2: if (numbers[0] <= numbers[1]) return numbers //store own result else return numbers[-1..0] //store own result default: def splitList = split(numbers) [new SortWorker(splitList[0]), new SortWorker(splitList[1])].each{forkOffChild it} //fork a child task return merge(* childrenResults) //use results of children tasks to calculate and store own result } } }

final def numbers = [1, 5, 2, 4, 3, 8, 6, 7, 3, 4, 5, 2, 2, 9, 8, 7, 6, 7, 8, 1, 4, 1, 7, 5, 8, 2, 3, 9, 5, 7, 4, 3]

withPool(1) { //feel free to experiment with the number of fork/join threads in the pool println "Sorted numbers: ${runForkJoin(new SortWorker(numbers))}" }

Availability

This feature is only available when using in the Fork/Join-based GParsPool , not in GParsExecutorsPool .